流动的 Word Count¶
Note
今天的大数据处理,对于延迟性的要求越来越高,因此流处理的基本概念与工作原理,是每一个大数据从业者必备的“技能点”。
我们从一个”流动的 Word Count“入手,去学习一下在流计算的框架下,Word Count 是怎么做的。
结构¶
在流计算场景中,Source 是流计算的数据源头;流处理引擎在数据流动过程中实现数据处理,保证数据完整性与一致性;Sink 指的是数据流向的目的地。
Source¶
在“流动的 Word Count”里,数据以行为粒度,分批地“喂给”Spark,每一行数据,都会触发一次 Job 计算。
具体来说,我们使用 netcat 工具,向本地 9999 端口的 Socket 地址发送数据行:
而 Spark 流处理应用,则时刻监听着本机的 9999 端口,一旦接收到数据条目,就会立即触发计算逻辑的执行。
流处理引擎¶
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("stream word count").getOrCreate()
# 从监听地址创建 DataFrame
df = (spark.readStream
.format("socket")
.option("host", "127.0.0.1")
.option("port", 9999)
.load())
import pyspark.sql.functions as F
# 先把字符串以空格为分隔符做拆分,得到单词数组 words
# 再把数组 words 展平为单词 word
df = (df
.withColumn("words", F.split("value", " "))
.withColumn("word", F.explode("words"))
.groupBy("word").count())
Sink¶
在 Complete mode 下,每一批次的计算结果,都会包含系统到目前为止处理的全部数据内容。
在 Update mode 下,每个批次仅输出内容有变化的数据记录。
# 指定Sink为终端
# 指定输出选项
# 指定输出模式
# 启动流处理应用
# 等待中断指令
(df.writeStream.format("console")
.option("truncate", False)
.outputMode("complete")
.start()
.awaitTermination())